Sorry, your browser cannot access this site
This page requires browser support (enable) JavaScript
Learn more >

分布式锁解决司机抢单

因为上面写的司机抢单并没有考虑并发,类似于电商的超卖问题

**解决方案

  • 第一种 设置数据库事务的隔离级别,设置为Serializable,效率低下

  • 第二种 使用乐观锁解决,通过版本号进行控制

  • 第三种 加锁解决,学习过synchronized 及lock锁,本地锁,目前微服务架构,分布式部署方式。

本地锁的局限性

  • 我们使用锁一般都是,synchronized 及lock锁,这些都是本地锁,只在当前jvm生效,在微服务里面就是只有当个微服务生效
  • 举例演示
    TestController
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @Tag(name = "测试接口")  
    @RestController
    @RequestMapping("/order/test")
    public class TestController {

    @Autowired
    private TestService testService;

    @GetMapping("testLock")
    public Result testLock() {
    testService.testLock();
    return Result.ok();
    }
    }

TestServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Service  
public class TestServiceImpl implements TestService{

@Autowired
private StringRedisTemplate redisTemplate;

@Override
public synchronized void testLock() {
//从redis里面获取数据
String value = redisTemplate.opsForValue().get("num");

if(StringUtils.isBlank(value)) {
return;
}

//把从redis获取数据+1
int num = Integer.parseInt(value);

//数据+1之后放回到redis里面
redisTemplate.opsForValue().set("num",String.valueOf(++num));
}
}
  • 测试,模拟200个请求并发过程

  • 在redis添加初始值,num = 0

使用测试工具 jmeter 实现功能测试

![[Pasted image 20250605203137.png]]
![[Pasted image 20250605203153.png]]

加上锁 synchronized 后是没有问题

  • 上面的测试方式是在一个服务内进行的,单机测试,但是如果部署到集群下这个锁不一定会生效

实现分布式锁-redis

设置过期时间,到时间之后自动释放锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public void testLock() {
//从redis里面获取数据
//1 获取当前锁 setnx
Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent("lock", "lock");

//2 如果获取到锁,从redis获取数据 数据+1 放回redis里面
if(ifAbsent) {
//获取锁成功,执行业务代码
//1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0
String value = redisTemplate.opsForValue().get("num");
//2.如果值为空则非法直接返回即可
if (StringUtils.isBlank(value)) {
return;
}
//3.对num值进行自增加一
int num = Integer.parseInt(value);
redisTemplate.opsForValue().set("num", String.valueOf(++num));

//3 释放锁
redisTemplate.delete("lock");
} else {
try {
Thread.sleep(100);
this.testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

问题:如果锁的删除时间小于事物的执行时间
例如:

  • 场景:如果业务逻辑执行的时间是7s
  • index1业务逻辑没执行完,3秒后释放
  • index2获取到锁,执行业务逻辑,3秒后锁会被自动释放
  • index3获取到锁,执行业务逻辑
  • index1业务逻辑完成,开始调用del释放锁,这是释放的是index3的锁,导致index3的业务只执行1s就被别人释放

修改后的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//uuid防止误删
@Override
public void testLock() {
//从redis里面获取数据
String uuid = UUID.randomUUID().toString();
//1 获取当前锁 setnx + 设置过期时间
// Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent("lock", "lock");
Boolean ifAbsent =
redisTemplate.opsForValue()
.setIfAbsent("lock", uuid,10, TimeUnit.SECONDS);

//2 如果获取到锁,从redis获取数据 数据+1 放回redis里面
if(ifAbsent) {
//获取锁成功,执行业务代码
//1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0
String value = redisTemplate.opsForValue().get("num");
//2.如果值为空则非法直接返回即可
if (StringUtils.isBlank(value)) {
return;
}
//3.对num值进行自增加一
int num = Integer.parseInt(value);
redisTemplate.opsForValue().set("num", String.valueOf(++num));
//出现异常

//3 释放锁
String redisUuid = redisTemplate.opsForValue().get("lock");
if(uuid.equals(redisUuid)) {
redisTemplate.delete("lock");
}

} else {
try {
Thread.sleep(100);
this.testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

通过LUA脚本保证原子性

通过uuid防止误删,但是还是有问题,不具备原子性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Override
public void testLock() {
//从redis里面获取数据
String uuid = UUID.randomUUID().toString();
//1 获取当前锁 setnx + 设置过期时间
// Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent("lock", "lock");
Boolean ifAbsent =
redisTemplate.opsForValue()
.setIfAbsent("lock", uuid,3, TimeUnit.SECONDS);

//2 如果获取到锁,从redis获取数据 数据+1 放回redis里面
if(ifAbsent) {
//获取锁成功,执行业务代码
//1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0
String value = redisTemplate.opsForValue().get("num");
//2.如果值为空则非法直接返回即可
if (StringUtils.isBlank(value)) {
return;
}
//3.对num值进行自增加一
int num = Integer.parseInt(value);
redisTemplate.opsForValue().set("num", String.valueOf(++num));
//出现异常

//3 释放锁 lua脚本实现
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
// lua脚本
String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" +
"then\n" +
" return redis.call(\"del\",KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
redisScript.setScriptText(script);
// 设置返回结果
redisScript.setResultType(Long.class);
redisTemplate.execute(redisScript, Arrays.asList("lock"), uuid);
} else {
try {
Thread.sleep(100);
this.testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

总结

1、加锁

1
2
3
4
// 1. 从Redis中获取锁,set k1 v1 px 20000 nx
String uuid = UUID.randomUUID().toString();
Boolean lock = this.redisTemplate.opsForValue()
.setIfAbsent("lock", uuid, 2, TimeUnit.SECONDS);

2、使用lua释放锁

1
2
3
4
5
6
7
8
// 2. 释放锁 del
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 设置lua脚本返回的数据类型
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
// 设置lua脚本返回类型为Long
redisScript.setResultType(Long.class);
redisScript.setScriptText(script);
redisTemplate.execute(redisScript, Arrays.asList("lock"),uuid);

3、重试

1
2
Thread.sleep(500); 
testLock();

为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:

第一个:互斥性,在任何时刻,只有一个客户端能持有锁。

第二个:不会发生死锁,即使有一个客户端在获取锁操作时候崩溃了,也能保证其他客户端能获取到锁。

第三个:解铃还须系铃人,解锁加锁必须同一个客户端操作。

第四个:加锁和解锁必须具备原子性

实现分布式锁-Redisson

准备工作

引入依赖

在common里service-utils引入依赖

1
2
3
4
<dependency>  
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
</dependency>

创建Redisson配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Data  
@Configuration
@ConfigurationProperties(prefix = "spring.data.redis")
public class RedissonConfig {

private String host;

private String password;

private String port;

private int timeout = 3000;
private static String ADDRESS_PREFIX = "redis://";

@Bean
RedissonClient redissonSingle() {
Config config = new Config();

if(!StringUtils.hasText(host)){
throw new RuntimeException("host is empty");
}
SingleServerConfig serverConfig = config.useSingleServer()
.setAddress(ADDRESS_PREFIX + this.host + ":" + port)
.setTimeout(this.timeout);
if(StringUtils.hasText(this.password)) {
serverConfig.setPassword(this.password);
}
return Redisson.create(config);
}
}

在业务方法编写加锁和解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Redisson实现  
@Override
public void testLock() {

// 通过redisson创建锁对象
RLock lock = redissonClient.getLock("lock1");

// 尝试获取锁
// 阻塞一直等待直到获取到,获取锁之后设置锁的超时时间为10秒,没有参数默认是30秒
lock.lock(10, TimeUnit.SECONDS);

// tryLock,设置等待时间为30秒,超时时间为10秒
// try {
// boolean b = lock.tryLock(30, 10, TimeUnit.SECONDS);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }

//编写业务代码
//1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0
String value = redisTemplate.opsForValue().get("num");
//2.如果值为空则非法直接返回即可
if (StringUtils.isBlank(value)) {
return;
}
//3.对num值进行自增加一
int num = Integer.parseInt(value);
redisTemplate.opsForValue().set("num", String.valueOf(++num));

//释放锁
lock.unlock();
}

看门狗原理

只要线程加锁成功,就会启动一个watch dog看门狗,它是一个后台进程,每隔十秒检查一下,如果线程还持有锁,那么就会不断延长锁key的生存时间,因此可以解决锁过期释放,业务还没完成的问题

  • 若使用 tryLock() 方法并指定了租约时间,则不会启动看门狗
  • 如果我们未指定超时时间,就会使用 lockwatchdogTimeout = 30 * 1000

评论